package gl.java.network.transport.kcp.udp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.AbstractNioChannel;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SocketUtils;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
 */
@Slf4j
public class UdpServerChannel extends AbstractNioMessageChannel {
    private ChannelConfig config;
    private static final ChannelMetadata METADATA = new ChannelMetadata(false);
    private final Map<SocketAddress, UdpServerChildChannel> connectingMap = new HashMap();

    private static DatagramChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each DatagramChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openDatagramChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }

    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    /**
     * @param parent
     * @param ch
     * @param readInterestOp
     * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
     */
    public UdpServerChannel(Channel parent, DatagramChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
        config = new UdpChannelConfig(this, ch.socket());
        config.setOption(ChannelOption.SO_BROADCAST, true);

    }

    public UdpServerChannel() {
        this(null, newSocket(DEFAULT_SELECTOR_PROVIDER), SelectionKey.OP_READ);
    }

    @Override
    protected int doReadMessages(List buf) {
        DatagramChannel ch = javaChannel();
        ChannelConfig config = config();
        RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

        ByteBuf data = allocHandle.allocate(config.getAllocator());
        allocHandle.attemptedBytesRead(data.writableBytes());
        boolean free = true;
        try {
            ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
            int pos = nioData.position();
            SocketAddress receive = ch.receive(nioData);
            InetSocketAddress remoteAddress = (InetSocketAddress) receive;

            if (remoteAddress == null) {
                log.warn("doReadMessages.remote address is null,:" + ch.getRemoteAddress());
                return 0;
            }
            allocHandle.lastBytesRead(nioData.position() - pos);
            buf.add(UdpUtils.newUdpPacket(data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()),
                    (InetSocketAddress) ch.getLocalAddress(), remoteAddress));
            log.debug("UdpServerChannel.doReadMessages packet,refCnt:{}, len:{},from:{}", data.refCnt() , (nioData.position() - pos) , remoteAddress);
            free = false;

            return 1;
        } catch (Throwable cause) {
            PlatformDependent.throwException(cause);
            return -1;
        } finally {
            if (free) {
                data.release();
            }
        }
    }

    @Override
    protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
        DatagramPacket packet = (DatagramPacket) msg;
        ByteBuf data = packet.content();
        final int dataLen = data.readableBytes();
        log.info("UdpServerChannel.doWriteMessage.len:" + dataLen);
        if (dataLen == 0) {
            return true;
        }
        InetSocketAddress remoteAddress = packet.recipient();
        if (remoteAddress == null) {
            log.warn("UdpServerChannel.doWriteMessage.remoteAddress is null");
            return false;
        }
        final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
        return javaChannel().send(nioData, remoteAddress) > 0;
    }

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        return false;
    }

    @Override
    protected void doFinishConnect() throws Exception {

    }

    @Override
    protected SocketAddress localAddress0() {
        return javaChannel().socket().getLocalSocketAddress();
    }


    @Override
    protected SocketAddress remoteAddress0() {
        return javaChannel().socket().getRemoteSocketAddress();
    }

    @Override
    public DatagramChannel javaChannel() {
        return (DatagramChannel) super.javaChannel();
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            javaChannel().socket().bind(localAddress);
        }
        log.debug("doBind");
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        log.debug("bind");
        return super.bind(localAddress);
    }

    @Override
    protected void doDisconnect() throws Exception {

    }

    @Override
    public ChannelConfig config() {
        return config;
    }

    @Override
    public boolean isActive() {
//        log.info("isActive");
        DatagramChannel ch = javaChannel();
        return ch.isOpen() && ch.socket().isBound();
    }

    @Override
    public ChannelMetadata metadata() {
        return METADATA;
    }

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new UdpClientUnsafe();
    }

    public void send(Object msg) {
        try {
            doWriteMessage(msg, null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private final class UdpClientUnsafe extends AbstractNioUnsafe {

        private final List<DatagramPacket> readBuf = new ArrayList();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);
            try {

                do {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());

                log.debug("UdpServerChannel.Unsafe.read.size:{}" , readBuf.size());
                for (int i = 0; i < readBuf.size(); i++) {
                    DatagramPacket packet = readBuf.get(i);
                    UdpServerChildChannel childChannel = getOrCreateChildChannel(packet, UdpServerChannel.this);
                    if (!childChannel.isActive()) {
                        packet.release();
                        continue;
                    }
                    childChannel.pipeline().fireChannelRead(packet);

                }
                allocHandle.readComplete();
                readBuf.clear();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (!config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

    protected UdpServerChildChannel getOrCreateChildChannel(DatagramPacket datagramPacket, UdpServerChannel parent) {
        InetSocketAddress sender = datagramPacket.sender();
        if (connectingMap.containsKey(sender)) {
            return connectingMap.get(sender);
        }

        log.debug("create child,sender:{}" , sender);
        final UdpServerChildChannel udpChildChannel = new UdpServerChildChannel(parent, parent.javaChannel(), datagramPacket);
        parent.pipeline().fireChannelRead(udpChildChannel);
        parent.pipeline().fireChannelReadComplete();

        connectingMap.put(sender, udpChildChannel);

        return udpChildChannel;
    }

    public void doCloseChildChannel(Channel channel) {
        throw new UnsupportedOperationException();
    }
}
